{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Principal Component Analysis (PCA)\n",
    "\n",
    "In this notebook, we will demonstrate the end-to-end workflow of Spark RAPIDS accelerated PCA."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "import time\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "No active Spark session found, initializing manually.\n",
      "File already exists. Skipping download.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "24/10/04 18:04:27 WARN Utils: Your hostname, cb4ae00-lcedt resolves to a loopback address: 127.0.1.1; using 10.110.47.100 instead (on interface eno1)\n",
      "24/10/04 18:04:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n",
      "24/10/04 18:04:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "24/10/04 18:04:27 WARN RapidsPluginUtils: RAPIDS Accelerator 25.02.1 using cudf 25.02.1, private revision 9fac64da220ddd6bf5626bd7bd1dd74c08603eac\n",
      "24/10/04 18:04:27 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n",
      "24/10/04 18:04:31 WARN GpuDeviceManager: RMM pool is disabled since spark.rapids.memory.gpu.pooling.enabled is set to false; however, this configuration is deprecated and the behavior may change in a future release.\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark import SparkConf\n",
    "\n",
    "def get_rapids_jar():\n",
    "    import os\n",
    "    import requests\n",
    "\n",
    "    SPARK_RAPIDS_VERSION = \"25.04.0\"\n",
    "    rapids_jar = f\"rapids-4-spark_2.12-{SPARK_RAPIDS_VERSION}.jar\"\n",
    "    if not os.path.exists(rapids_jar):\n",
    "        print(\"Downloading spark rapids jar\")\n",
    "        url = f\"https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/{SPARK_RAPIDS_VERSION}/{rapids_jar}\"\n",
    "        response = requests.get(url)\n",
    "        if response.status_code == 200:\n",
    "            with open(rapids_jar, \"wb\") as f:\n",
    "                f.write(response.content)\n",
    "            print(f\"File '{rapids_jar}' downloaded and saved successfully.\")\n",
    "        else:\n",
    "            print(f\"Failed to download the file. Status code: {response.status_code}\")\n",
    "    else:\n",
    "        print(\"File already exists. Skipping download.\")\n",
    "    return rapids_jar\n",
    "\n",
    "def initialize_spark(rapids_jar: str):\n",
    "    '''\n",
    "    If no active Spark session is found, initialize and configure a new one. \n",
    "    '''\n",
    "    import socket\n",
    "    hostname = socket.gethostname()\n",
    "\n",
    "    conf = SparkConf()\n",
    "    conf.setMaster(f\"spark://{hostname}:7077\") # Assuming master is on host and default port. \n",
    "    conf.set(\"spark.task.maxFailures\", \"1\")\n",
    "    conf.set(\"spark.driver.memory\", \"10g\")\n",
    "    conf.set(\"spark.executor.memory\", \"8g\")\n",
    "    conf.set(\"spark.rpc.message.maxSize\", \"1024\")\n",
    "    conf.set(\"spark.sql.pyspark.jvmStacktrace.enabled\", \"true\")\n",
    "    conf.set(\"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled\", \"false\")\n",
    "    conf.set(\"spark.sql.execution.arrow.pyspark.enabled\", \"true\")\n",
    "    conf.set(\"spark.python.worker.reuse\", \"true\")\n",
    "    conf.set(\"spark.rapids.ml.uvm.enabled\", \"true\")\n",
    "    conf.set(\"spark.jars\", rapids_jar)\n",
    "    conf.set(\"spark.executorEnv.PYTHONPATH\", rapids_jar)\n",
    "    conf.set(\"spark.rapids.memory.gpu.minAllocFraction\", \"0.0001\")\n",
    "    conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n",
    "    conf.set(\"spark.locality.wait\", \"0s\")\n",
    "    conf.set(\"spark.sql.cache.serializer\", \"com.nvidia.spark.ParquetCachedBatchSerializer\")\n",
    "    conf.set(\"spark.rapids.memory.gpu.pooling.enabled\", \"false\")\n",
    "    conf.set(\"spark.sql.execution.sortBeforeRepartition\", \"false\")\n",
    "    conf.set(\"spark.rapids.sql.format.parquet.reader.type\", \"MULTITHREADED\")\n",
    "    conf.set(\"spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel\", \"20\")\n",
    "    conf.set(\"spark.rapids.sql.multiThreadedRead.numThreads\", \"20\")\n",
    "    conf.set(\"spark.rapids.sql.python.gpu.enabled\", \"true\")\n",
    "    conf.set(\"spark.rapids.memory.pinnedPool.size\", \"2G\")\n",
    "    conf.set(\"spark.python.daemon.module\", \"rapids.daemon\")\n",
    "    conf.set(\"spark.rapids.sql.batchSizeBytes\", \"512m\")\n",
    "    conf.set(\"spark.sql.adaptive.enabled\", \"false\")\n",
    "    conf.set(\"spark.sql.files.maxPartitionBytes\", \"512m\")\n",
    "    conf.set(\"spark.rapids.sql.concurrentGpuTasks\", \"1\")\n",
    "    conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\", \"20000\")\n",
    "    conf.set(\"spark.rapids.sql.explain\", \"NONE\")\n",
    "    \n",
    "    spark = SparkSession.builder.appName(\"spark-rapids-ml-pca\").config(conf=conf).getOrCreate()\n",
    "    return spark\n",
    "\n",
    "# Check if Spark session is already active, if not, initialize it\n",
    "if 'spark' not in globals():\n",
    "    print(\"No active Spark session found, initializing manually.\")\n",
    "    rapids_jar = os.environ.get('RAPIDS_JAR')\n",
    "    if rapids_jar is None:\n",
    "        rapids_jar = get_rapids_jar()\n",
    "    spark = initialize_spark(rapids_jar)\n",
    "else:\n",
    "    print(\"Using existing Spark session.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate synthetic dataset\n",
    "\n",
    "Here we generate a 100,000 x 2048 random dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "24/10/04 18:04:45 WARN TaskSetManager: Stage 0 contains a task of very large size (160085 KiB). The maximum recommended task size is 1000 KiB.\n",
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "rows = 100000\n",
    "dim = 2048\n",
    "dtype = 'float32'\n",
    "np.random.seed(42)\n",
    "\n",
    "data = np.random.rand(rows, dim).astype(dtype)\n",
    "pd_data = pd.DataFrame({\"features\": list(data)})\n",
    "prepare_df = spark.createDataFrame(pd_data)\n",
    "prepare_df.write.mode(\"overwrite\").parquet(\"PCA_data.parquet\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Spark-RAPIDS-ML accepts ArrayType input\n",
    "\n",
    "Note that in the original Spark-ML PCA, we must `Vectorize` the input column:\n",
    "\n",
    "```python\n",
    "from pyspark.ml.linalg import Vectors\n",
    "data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),\n",
    "    (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),\n",
    "    (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]\n",
    "df = spark.createDataFrame(data,[\"features\"])\n",
    "df.show()\n",
    "```\n",
    "\n",
    "...whereas the Spark-RAPIDS-ML version does not require extra Vectorization, and can accept an ArrayType column as the input column:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- features: array (nullable = true)\n",
      " |    |-- element: float (containsNull = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data_df = spark.read.parquet(\"PCA_data.parquet\")\n",
    "data_df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Using Spark-RAPIDS-ML PCA (GPU)\n",
    "\n",
    "Compared to the Spark-ML PCA training API:\n",
    "\n",
    "```python\n",
    "from pyspark.ml.feature import PCA\n",
    "pca = PCA(k=3, inputCol=\"features\")\n",
    "pca.setOutputCol(\"pca_features\")\n",
    "```\n",
    "\n",
    "We use a customized class which requires **no code change** from the user to enjoy GPU acceleration:\n",
    "\n",
    "```python\n",
    "from spark_rapids_ml.feature import PCA\n",
    "pca = PCA(k=3, inputCol=\"features\")\n",
    "pca.setOutputCol(\"pca_features\")\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PCA_570681141389"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from spark_rapids_ml.feature import PCA\n",
    "\n",
    "gpu_pca = PCA(k=2, inputCol=\"features\")\n",
    "gpu_pca.setOutputCol(\"pca_features\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The PCA estimator object can be persisted and reloaded."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "estimator_path = \"/tmp/pca_estimator\"\n",
    "gpu_pca.write().overwrite().save(estimator_path)\n",
    "gpu_pca_loaded = PCA.load(estimator_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Fit"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "24/10/04 18:04:58 WARN MultiFileReaderThreadPool: Configuring the file reader thread pool with a max of 32 threads instead of spark.rapids.sql.multiThreadedRead.numThreads = 20\n",
      "2024-10-04 18:04:58,487 - spark_rapids_ml.feature.PCA - INFO - CUDA managed memory enabled.\n",
      "2024-10-04 18:04:58,570 - spark_rapids_ml.feature.PCA - INFO - Training spark-rapids-ml with 1 worker(s) ...\n",
      "INFO: Process 2762394 found CUDA visible device(s): 0\n",
      "2024-10-04 18:05:01,613 - spark_rapids_ml.feature.PCA - INFO - Loading data into python worker memory\n",
      "2024-10-04 18:05:02,551 - spark_rapids_ml.feature.PCA - INFO - Initializing cuml context\n",
      "2024-10-04 18:05:03,795 - spark_rapids_ml.feature.PCA - INFO - Invoking cuml fit\n",
      "2024-10-04 18:05:05,326 - spark_rapids_ml.feature.PCA - INFO - Cuml fit complete\n",
      "2024-10-04 18:05:06,858 - spark_rapids_ml.feature.PCA - INFO - Finished training\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "GPU PCA fit took: 8.90433144569397 sec\n"
     ]
    }
   ],
   "source": [
    "start_time = time.time()\n",
    "gpu_pca_model = gpu_pca_loaded.fit(data_df)\n",
    "gpu_fit_time = time.time() - start_time\n",
    "print(f\"GPU PCA fit took: {gpu_fit_time} sec\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transform"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------------------------+\n",
      "|pca_features               |\n",
      "+---------------------------+\n",
      "|[0.062363233, 0.4037608]   |\n",
      "|[0.49734917, 0.703541]     |\n",
      "|[0.0035427138, 0.29358602] |\n",
      "|[-0.06798951, 0.37400067]  |\n",
      "|[0.10075127, 0.34651726]   |\n",
      "|[-0.22320557, 0.6660976]   |\n",
      "|[0.49608234, 0.6761328]    |\n",
      "|[0.25515205, 0.20352581]   |\n",
      "|[-0.5102935, 0.319284]     |\n",
      "|[-0.5109488, 0.2756377]    |\n",
      "|[0.411546, -0.17954555]    |\n",
      "|[0.21616393, -0.46268395]  |\n",
      "|[-0.0924304, 0.65660465]   |\n",
      "|[0.12355948, 0.9478601]    |\n",
      "|[0.49234354, 0.63746333]   |\n",
      "|[-0.86077166, 0.0037032962]|\n",
      "|[-0.013956882, 0.663955]   |\n",
      "|[-0.30510652, 0.02372247]  |\n",
      "|[-0.05999008, 0.28261736]  |\n",
      "|[0.36605445, 0.9674797]    |\n",
      "+---------------------------+\n",
      "only showing top 20 rows\n",
      "\n",
      "GPU PCA transform took: 0.43911027908325195 sec\n"
     ]
    }
   ],
   "source": [
    "start_time = time.time()\n",
    "embeddings = gpu_pca_model.transform(data_df).select(\"pca_features\").show(truncate=False)\n",
    "gpu_transform_time = time.time() - start_time\n",
    "print(f\"GPU PCA transform took: {gpu_transform_time} sec\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Using Spark-ML PCA (CPU)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PCA_58add243f20d"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml.feature import PCA\n",
    "\n",
    "cpu_pca = PCA(k=2, inputCol=\"features\")\n",
    "cpu_pca.setOutputCol(\"pca_features\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- features: vector (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.functions import array_to_vector\n",
    "\n",
    "vector_df = data_df.select(array_to_vector(\"features\").alias(\"features\"))\n",
    "vector_df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Fit"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "24/10/04 17:07:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU PCA fit took: 63.37388610839844 sec\n"
     ]
    }
   ],
   "source": [
    "start_time = time.time()\n",
    "cpu_pca_model = cpu_pca.fit(vector_df)\n",
    "pca_fit_time = time.time() - start_time\n",
    "print(f\"CPU PCA fit took: {pca_fit_time} sec\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transform"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------------------------+\n",
      "|pca_features                               |\n",
      "+-------------------------------------------+\n",
      "|[0.24926765828229927,0.3425432972889563]   |\n",
      "|[-0.5175207040808384,0.48893065865444574]  |\n",
      "|[-0.2505049373829902,0.381272141155778]    |\n",
      "|[-0.39046980420292005,0.4870705091697811]  |\n",
      "|[-0.4024088726395023,0.707133448810984]    |\n",
      "|[-0.3061227832285992,0.5363554872099332]   |\n",
      "|[-0.6065136982526093,0.5205197626985932]   |\n",
      "|[-0.21870566838630084,0.6516598402789231]  |\n",
      "|[0.1910036552854184,0.6336513389989592]    |\n",
      "|[0.6139537641786907,0.6055187085018856]    |\n",
      "|[-0.026502904776425647,-0.0366087508156753]|\n",
      "|[-0.2989311781309336,-0.05136110567458389] |\n",
      "|[-0.5474468086054212,-0.18779964958125014] |\n",
      "|[-0.6644746232216499,0.10351178251944647]  |\n",
      "|[-0.12685301272617464,0.47394431583661295] |\n",
      "|[-0.4355221246718862,-0.00346289187881239] |\n",
      "|[0.6222719258951077,0.5488293416698503]    |\n",
      "|[0.04966907735703511,0.7138677407505005]   |\n",
      "|[0.6260486995906139,0.3553228450428632]    |\n",
      "|[0.16396683091519929,0.7382693234881972]   |\n",
      "+-------------------------------------------+\n",
      "only showing top 20 rows\n",
      "\n",
      "CPU PCA transform took: 0.19607114791870117 sec\n"
     ]
    }
   ],
   "source": [
    "start_time = time.time()\n",
    "embeddings = cpu_pca_model.transform(vector_df).select(\"pca_features\").show(truncate=False)\n",
    "pca_transform_time = time.time() - start_time\n",
    "print(f\"CPU PCA transform took: {pca_transform_time} sec\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Summary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU runtime: (64.02s + 0.20s)\n",
      "GPU runtime: (8.76s + 0.42s)\n",
      "End-to-end speedup: CPU / GPU = 7.00x\n"
     ]
    }
   ],
   "source": [
    "speedup = (pca_fit_time + pca_transform_time) / (gpu_fit_time + gpu_transform_time)\n",
    "print(f\"CPU runtime: ({pca_fit_time:.2f}s + {pca_transform_time:.2f}s)\")\n",
    "print(f\"GPU runtime: ({gpu_fit_time:.2f}s + {gpu_transform_time:.2f}s)\")\n",
    "print(f\"End-to-end speedup: CPU / GPU = {speedup:.2f}x\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "rapids-25.02",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
